export const description =
  'Learn how to implement Server-Sent Events (SSE) in Robyn for real-time server-to-client communication.'

# Server-Sent Events (SSE)

After learning about [form data handling](/documentation/en/api_reference/form_data), Batman realized he needed a way to push real-time updates to his crime monitoring dashboard. Criminals don't wait for Batman to refresh his browser!

He discovered Server-Sent Events (SSE) - a perfect solution for one-way communication from server to client over HTTP. SSE allows Batman to stream live data to his dashboard without the complexity of full bidirectional communication.

"This is exactly what I need for my crime alerts!" Batman exclaimed. "I can push updates to the dashboard instantly when new crimes are detected."

Server-Sent Events are ideal for:
- Real-time notifications
- Live data feeds
- Progress updates
- Chat applications (server-to-client only)
- Dashboard updates
- Log streaming

## How does it work?

<Row>
<Col>

Batman can create Server-Sent Events streams by using the `SSEResponse` and `SSEMessage` classes. He can use both regular generators and async generators depending on his needs:

- **Regular generators**: Perfect for simple data streams or when working with synchronous operations
- **Async generators**: Ideal when Batman needs to perform async operations like database queries or API calls within the stream

</Col>

<Col sticky>

<CodeGroup title="SSE Response" tag="GET" label="/events">

```python {{ title: 'Basic SSE Stream' }}
from robyn import Robyn, SSEResponse, SSEMessage
import time

app = Robyn(__file__)

@app.get("/events")
def stream_events(request):
    def event_generator():
        for i in range(10):
            yield SSEMessage(f"Event {i}", id=str(i))
            time.sleep(1)
    
    return SSEResponse(event_generator())
```
```python {{ title: 'JSON Data Stream' }}
from robyn import Robyn, SSEResponse, SSEMessage
import json
import time

app = Robyn(__file__)

@app.get("/events/json")
def stream_json_events(request):
    def json_event_generator():
        for i in range(5):
            data = {
                "id": i,
                "message": f"Update {i}",
                "timestamp": time.time()
            }
            yield SSEMessage(
                json.dumps(data), 
                event="update", 
                id=str(i)
            )
            time.sleep(2)
    return SSEResponse(json_event_generator())
```

```python {{ title: 'Async Generator Stream' }}
from robyn import Robyn, SSEResponse, SSEMessage
import asyncio
import time

app = Robyn(__file__)

@app.get("/events/async")
async def stream_async_events(request):
    async def async_event_generator():
        for i in range(8):
            # Simulate async work like database calls
            await asyncio.sleep(0.5)
            yield SSEMessage(
                f"Async message {i} - {time.strftime('%H:%M:%S')}", 
                event="async", 
                id=str(i)
            )
    
    return SSEResponse(async_event_generator())
```

</CodeGroup>
</Col>
</Row>

---

## Async Generators

<Row>
<Col>

When Batman needs to perform async operations during his SSE streams - like fetching data from databases or making API calls - he uses async generators with `async def` and `await`. This allows him to handle multiple streams concurrently without blocking other operations.

The key difference is using `async def` for the generator function and `await` for async operations inside the generator:

</Col>

<Col sticky>

<CodeGroup title="Advanced Async SSE" tag="GET" label="/events/database">

```python {{ title: 'Database Stream' }}
from robyn import Robyn, SSEResponse, SSEMessage
import asyncio
import json
import time

app = Robyn(__file__)

@app.get("/events/database")
async def stream_database_events(request):
    async def database_event_generator():
        for i in range(10):
            # Simulate async database query
            await asyncio.sleep(0.3)
            
            # Simulate fetching data from database
            data = {
                "crime_id": i,
                "location": f"Gotham District {i}",
                "severity": "high" if i % 2 == 0 else "low",
                "timestamp": time.time()
            }
            
            yield SSEMessage(
                json.dumps(data),
                event="crime_alert",
                id=str(i)
            )
    
    return SSEResponse(database_event_generator())
```

```python {{ title: 'API Integration Stream' }}
from robyn import Robyn, SSEResponse, SSEMessage
import asyncio
import aiohttp
import json

app = Robyn(__file__)

@app.get("/events/api")
async def stream_api_events(request):
    async def api_event_generator():
        async with aiohttp.ClientSession() as session:
            for i in range(5):
                try:
                    # Make async API call
                    async with session.get(f"https://api.example.com/data/{i}") as response:
                        data = await response.json()
                        
                        yield SSEMessage(
                            json.dumps(data),
                            event="api_update",
                            id=str(i)
                        )
                except Exception as e:
                    yield SSEMessage(
                        f"Error fetching data: {str(e)}",
                        event="error",
                        id=f"error_{i}"
                    )
                
                await asyncio.sleep(1)
    
    return SSEResponse(api_event_generator())
```

</CodeGroup>
</Col>
</Row>

---

## What's next?

Batman has mastered Server-Sent Events and can now stream real-time updates to his crime dashboard. While SSE is perfect for one-way communication from server to client, Batman realizes he needs bidirectional communication for more interactive features like real-time chat with his allies.

Next, he wants to explore how to handle bidirectional communication with [WebSockets](/documentation/en/api_reference/websockets) for more interactive features.

If Batman needs to handle unexpected situations, he'll learn about [Exception handling](/documentation/en/api_reference/exceptions) to make his applications more robust.

For scaling his crime monitoring system across multiple processes, Batman will explore [Scaling the Application](/documentation/en/api_reference/scaling).

